package com.bw;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.bean.TradeSkuOrderBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.funciton.DimAsyncFunction;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DwsTradeSkuOrderAsyncCacheRealWindow extends BaseApp {

    public static void main(String[] args) {
        new DwsTradeSkuOrderAsyncCacheRealWindow().start(Constant.TOPIC_DWD_TRADE_ORDER_DETAIL,Constant.TOPIC_DWD_TRADE_ORDER_DETAIL,4,10029);
    }
    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
        // 1.ETL清洗数据
        SingleOutputStreamOperator<JSONObject> etlStream = dataStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                try {
                    // 因为有撤回流,所有要过滤
                    if (s != null) {
                        JSONObject jsonObject = JSON.parseObject(s);
                        String skuId = jsonObject.getString("sku_id");
                        Long ts = jsonObject.getLong("ts");
                        if (skuId != null && ts != null) {
                            jsonObject.put("ts", ts * 1000);
                            jsonObject.put("shop_id",  new Random().nextInt(10) + 1);
                            collector.collect(jsonObject);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        etlStream.print();
        // 2.添加水位线
        SingleOutputStreamOperator<JSONObject> wmStream = etlStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject jsonObject, long l) {
                return jsonObject.getLong("ts");
            }
        }));
        // 3.分组
        KeyedStream<JSONObject, String> keyedStream = wmStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject jsonObject) throws Exception {
                return jsonObject.getString("id");
            }
        });
        // 4.防止下游重复计算
        SingleOutputStreamOperator<TradeSkuOrderBean> processStream = keyedStream.process(new ProcessFunction<JSONObject, TradeSkuOrderBean>() {

            private MapState<String, BigDecimal> mapState;

            /**                                                 下游
             * {"id":10,“split_total_amount”：10}---> 10        10-0    10
             * {"id":10,“split_total_amount”：15}---> 15        15-10   5
             *
             *      优点：时效性高
             *
             *
             * 定时器： <id,[list]>---><1,[10,15]>
             *      从状态里面取，如果状态里面没有值，代表第一条来了，开启10秒定时器---->
             *      当10秒过了，取出list最后一条向下游发，清楚状态
             *      缺点：时效性不高，不管怎么样，都需要等10秒
             *
             * @param parameters
             * @throws Exception
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                MapStateDescriptor<String, BigDecimal> stateDescriptor = new MapStateDescriptor<String, BigDecimal>("order_state", String.class, BigDecimal.class);
                stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(30L)).build());
                mapState = getRuntimeContext().getMapState(stateDescriptor);
            }

            @Override
            public void processElement(JSONObject jsonObject, ProcessFunction<JSONObject, TradeSkuOrderBean>.Context context, Collector<TradeSkuOrderBean> collector) throws Exception {
                // 从状态里面取值
                BigDecimal split_total_amount = mapState.get("splitTotalAmount");// 分摊金额
                BigDecimal split_activity_amount = mapState.get("splitActivityAmount");// 活动金额
                BigDecimal split_coupon_amount = mapState.get("splitCouponAmount");// 优惠券金额
                BigDecimal original_amount = mapState.get("originalAmount");// 原始金额

                split_total_amount = split_total_amount == null ? new BigDecimal(0) : split_total_amount;
                split_activity_amount = split_activity_amount == null ? new BigDecimal(0) : split_activity_amount;
                split_coupon_amount = split_coupon_amount == null ? new BigDecimal(0) : split_coupon_amount;
                original_amount = original_amount == null ? new BigDecimal(0) : original_amount;

                // 取出数据里面的度量值

                BigDecimal splitTotalAmount = jsonObject.getBigDecimal("split_total_amount");
                BigDecimal splitActivityAmount = jsonObject.getBigDecimal("split_activity_amount");
                BigDecimal splitCouponAmount = jsonObject.getBigDecimal("split_coupon_amount");

                BigDecimal sku_num = jsonObject.getBigDecimal("sku_num");
                BigDecimal orderPrice = jsonObject.getBigDecimal("order_price");

                // 计算原始金额
                BigDecimal originalAmount = orderPrice.multiply(sku_num);

                // 直接往下游发
                String skuId = jsonObject.getString("sku_id");
                String skuName = jsonObject.getString("sku_name");
                String id = jsonObject.getString("id");
                Long ts = jsonObject.getLong("ts");


                String shop_id = jsonObject.getString("shop_id");

                // 往下游发的数据
                collector.collect(TradeSkuOrderBean.builder()
                        .orderDetailId(id)
                        .skuName(skuName)
                        .ts(ts)
                        .skuId(skuId)
                        .shopId(shop_id)
                        .originalAmount(originalAmount.subtract(original_amount))
                        .activityReduceAmount(splitActivityAmount.subtract(split_activity_amount))
                        .couponReduceAmount(splitCouponAmount.subtract(split_coupon_amount))
                        .orderAmount(splitTotalAmount.subtract(split_total_amount))
                        .build());

                // 更新到状态里面
                // 更新状态
                mapState.put("splitTotalAmount", splitTotalAmount);
                mapState.put("splitActivityAmount", splitActivityAmount);
                mapState.put("splitCouponAmount", splitCouponAmount);
//                mapState.put("originalAmount", originalAmount);

            }
        });


        // 5.以sku_id分组
        KeyedStream<TradeSkuOrderBean, String> KeyedStream = processStream.keyBy(new KeySelector<TradeSkuOrderBean, String>() {
            @Override
            public String getKey(TradeSkuOrderBean tradeSkuOrderBean) throws Exception {
                return tradeSkuOrderBean.getSkuId();
            }
        });
        // 6.聚合数据
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceStream = KeyedStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)))
                .reduce(new ReduceFunction<TradeSkuOrderBean>() {
                    @Override
                    public TradeSkuOrderBean reduce(TradeSkuOrderBean value1, TradeSkuOrderBean value2) throws Exception {
                        value1.setOriginalAmount(value1.getOriginalAmount().add(value2.getOriginalAmount()));
                        value1.setCouponReduceAmount(value1.getCouponReduceAmount().add(value2.getCouponReduceAmount()));
                        value1.setActivityReduceAmount(value1.getActivityReduceAmount().add(value2.getActivityReduceAmount()));
                        value1.setOrderAmount(value1.getOrderAmount().add(value2.getOrderAmount()));
                        return value1;
                    }
                }, new ProcessWindowFunction<TradeSkuOrderBean, TradeSkuOrderBean, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<TradeSkuOrderBean, TradeSkuOrderBean, String, TimeWindow>.Context context, Iterable<TradeSkuOrderBean> iterable, Collector<TradeSkuOrderBean> collector) throws Exception {
                        // 为了取时间窗口
                        TimeWindow window = context.window();
                        long start = window.getStart();
                        long end = window.getEnd();
                        for (TradeSkuOrderBean tradeSkuOrderBean : iterable) {
                            tradeSkuOrderBean.setStt(DateFormatUtil.tsToDateTime(start));
                            tradeSkuOrderBean.setEdt(DateFormatUtil.tsToDateTime(end));
                            tradeSkuOrderBean.setCurDate(DateFormatUtil.tsToDate(System.currentTimeMillis()));
                            collector.collect(tradeSkuOrderBean);
                        }
                    }
                });

        // 7.异步关联维度
        // 7.1 关联SKU
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream = AsyncDataStream.unorderedWait(reduceStream, new DimAsyncFunction<TradeSkuOrderBean>("dim_sku_info") {
            @Override
            public String getDimId(TradeSkuOrderBean tradeSkuOrderBean) {
                return tradeSkuOrderBean.getSkuId();
            }

            @Override
            public void setDim(TradeSkuOrderBean tradeSkuOrderBean,JSONObject dimInfo) {
                tradeSkuOrderBean.setSpuId(dimInfo.getString("spu_id"));
                tradeSkuOrderBean.setTrademarkId(dimInfo.getString("tm_id"));
                tradeSkuOrderBean.setCategory3Id(dimInfo.getString("category3_id"));
            }
        }, 100, TimeUnit.SECONDS);

        // 7.2 关联spu
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream1 = AsyncDataStream.unorderedWait(mapStream, new DimAsyncFunction<TradeSkuOrderBean>("dim_spu_info") {

            @Override
            public String getDimId(TradeSkuOrderBean tradeSkuOrderBean) {
                return tradeSkuOrderBean.getSpuId();
            }
            @Override
            public void setDim(TradeSkuOrderBean tradeSkuOrderBean, JSONObject dimInfo) {
                tradeSkuOrderBean.setSpuName(dimInfo.getString("spu_name"));
            }
        }, 100, TimeUnit.SECONDS);

        // 7.3 关联TM
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream2 = AsyncDataStream.unorderedWait(mapStream1, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_trademark") {

            @Override
            public String getDimId(TradeSkuOrderBean tradeSkuOrderBean) {
                return tradeSkuOrderBean.getTrademarkId();
            }
            @Override
            public void setDim(TradeSkuOrderBean tradeSkuOrderBean, JSONObject dimInfo) {
                tradeSkuOrderBean.setTrademarkName(dimInfo.getString("tm_name"));
            }
        }, 100, TimeUnit.SECONDS);

        // 7.4 关联C3
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream3 = AsyncDataStream.unorderedWait(mapStream2, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_category3") {

            @Override
            public String getDimId(TradeSkuOrderBean tradeSkuOrderBean) {
                return tradeSkuOrderBean.getCategory3Id();
            }
            @Override
            public void setDim(TradeSkuOrderBean tradeSkuOrderBean, JSONObject dimInfo) {
                tradeSkuOrderBean.setCategory3Name(dimInfo.getString("name"));
                tradeSkuOrderBean.setCategory2Id(dimInfo.getString("category2_id"));
            }
        }, 100, TimeUnit.SECONDS);

        // 7.5 关联C2
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream4 = AsyncDataStream.unorderedWait(mapStream3, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_category2") {

            @Override
            public String getDimId(TradeSkuOrderBean tradeSkuOrderBean) {
                return tradeSkuOrderBean.getCategory2Id();
            }
            @Override
            public void setDim(TradeSkuOrderBean tradeSkuOrderBean, JSONObject dimInfo) {
                tradeSkuOrderBean.setCategory2Name(dimInfo.getString("name"));
                tradeSkuOrderBean.setCategory1Id(dimInfo.getString("category1_id"));
            }
        }, 100, TimeUnit.SECONDS);

        // 7.6 关联C1
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream5 = AsyncDataStream.unorderedWait(mapStream4, new DimAsyncFunction<TradeSkuOrderBean>("dim_base_category1") {

            @Override
            public String getDimId(TradeSkuOrderBean tradeSkuOrderBean) {
                return tradeSkuOrderBean.getCategory1Id();
            }
            @Override
            public void setDim(TradeSkuOrderBean tradeSkuOrderBean, JSONObject dimInfo) {
                tradeSkuOrderBean.setCategory1Name(dimInfo.getString("name"));
            }
        }, 100, TimeUnit.SECONDS);

//        mapStream5.print();
        // 7.7 关联店铺
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream6 = AsyncDataStream.unorderedWait(mapStream4, new DimAsyncFunction<TradeSkuOrderBean>("dim_shop") {

            @Override
            public String getDimId(TradeSkuOrderBean tradeSkuOrderBean) {
                return tradeSkuOrderBean.getShopId();
            }
            @Override
            public void setDim(TradeSkuOrderBean tradeSkuOrderBean, JSONObject dimInfo) {
                tradeSkuOrderBean.setShopName(dimInfo.getString("name"));
            }
        }, 100, TimeUnit.SECONDS);



        // 8.写到Doris
//        mapStream6.print();


    }
}
